本文讨论了ES中的远程调用RPC
部分的源码实现。
一、前言
1.1 关于ES RPC
ES的分布式通信这块,主要是支持了RPC和REST两种模式(不支持大的package/stream的数据传输)。
集群Node之间的通信,数据的传输,java客户端的请求调用(TransportClient)使用的均是RPC(TCP)。
1.2 关于Netty
ES是基于netty开发的rpc模块(http同样基于netty),采用常规的基于注册/回调
方式进行事件驱动的reactor/proactor
模式(单线程调度,线程池处理)。
题外话,ES作为分布式系统把每个行为都抽象成了Action对象,且采用注册回调的方式进行通信,它的调用基本都是异步的,所有调用全部依赖ES
自己实现的线程池。话说这么多异步事件是不是可以直接抽象成Actor呢。(akka,或者quasar的纤程模型替代不知是否可以)。
二、ES RPC 源码分析
2.1 一个简单例子说明过程
首先上时序图,举最简单的例子引入话题,一次通过es的java Client Api进行Get请求调用(Tcp方式)。
场景1:Client 连接节点A并发送请求,数据在节点B。
场景2:场景1简化版,Client 连接节点A并发送请求,数据就在A直接返回。
关于RPC,这里有一篇文章写的很好你应该知道的RPC原理
2.2 ES中的RPC,源码部分
由于是基于Netty,那么我们直接看源码
首先看发送请求
TransportService(Es通信过程中存请求上下文,以及RPC方法映射,即request对应action的地方),
的 clientHandlers就是异步回调池(根据requestId拿到回调执行),存放的就是requestId以及对应的
TransportResponseHandler,与code1的request是一一对应的。code1,code3 Connection连接是根据node从transport获取的。
code2发送请求最终会执行sendRequestInternal()
code4 client线程每次通过socket调用一次远程接口前,生成一个唯一的ID,即requestID
(requestID必需保证在一个Socket连接里面是唯一的),一般常常使用AtomicLong从0开始累计数字生成唯一ID。code5 构造超时回调。
code6 todo
code7 把线程上下文和callback封装
code8 将处理结果的回调对象callback,存放到全局ConcurrentHashMap里面put(requestID, callback);
code9 放入定时线程池中(类似于ScheduledThreadPool),到时间会调用handleException(e)回调给该connection超时异常。
1 | public class TransportService extends AbstractLifecycleComponent { |
TcpTransport是Netty4Transport和Netty3Transport的父类,利用范型抽象出了Channel所有涉及Channel的均由
子类去实现,其负责封装所有发送请求的逻辑。NodeChannels存入transport容器中(TcpTransport),可执行发送逻辑。
TcpTransportChannel可执行响应逻辑。
发送会根据场景选择channel(recovery,bulk,reg,state,ping分别对应不同的channel[],个数也不同,其创建的过程
涉及到节点发现的过程,另文细说),响应就是发给对应的channel。最终执行消息传输的都是Transport(TcpTransport)。真正的发送逻辑是sendRequestToChannel(),最终执行code12发送消息。
1 | public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent implements Transport { |
发送响应
- 服务端接收到请求并处理后,将response结果(此结果中包含了前面的requestID)发送给客户端
接收消息
客户端socket连接上专门监听消息的线程收到消息,分析结果,取到requestID。
messageReceived是比较常见的解析数据包的过程,es自己通过XContent实现的序列化协议,所以代码可读性稍差,作者
自己通过mssagepack重写了这部分,详见别文。然叫首先交由TransportService.Adapter 从前面的ConcurrentHashMap里面get(requestID)出callback对象,
取消超时任务再交由线程池执行回调code13。code13执行的过程其实就是执行发送消息时的幂名内部类(也叫回调),
通常是交由channel去做异步通知(相当于非本地节点还在监听response),或者是Aqs释放本地阻塞(本地是调用发起方,见[源码]Elasticsearch源码1(通信机制之Future)/))。
1 | public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent implements Transport { |
1 | public class TransportService extends AbstractLifecycleComponent { |